篇首语:本文由编程笔记#小编为大家整理,主要介绍了C#RabbitMQ入门指南相关的知识,希望对你有一定的参考价值。
RabbitMQ 是采用 erlang 语言实现 AMQP (Advanced Message Queuing Protocol ,高级消息
队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息.
RabbitMQ 是目前非常热门的一款消息中间件,不管是互联网行业还是传统行业都在大量
地使用 RabbitMQ 凭借其高可靠、易扩展、高可用及丰富的功能特性受到越来越多企业的青睐。
RabbitMQ的具体特点可以概括为以下几点。
依赖
本文基于发稿时RabbitMQ的最新版本:3.8.19.
RabbitMQ客户端使用:RabbitMQ.Client 6.2.2
RabbitMQ可视化管理插件安装:官网。首先执行rabbitmq-plugins enable rabbitmq_management
命令,然后打开管理面板:http://localhost:15672/#/
即可,默认用户名密码都是guest。
消息 (Message):是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串、JSON 等,也可以很复杂,比如内嵌对象。
消息队列中间件 (Message Queue Middleware,简称为 MQ) 是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传和消息排队模型,它可以在分布式环境下扩展进程间的通信。它一般有两种传递模式:点对点(P2P, Point-to-Point) 模式和发布/订阅 (Pub/Sub) 模式。
点对点模式是基于队列的,消息生产发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输成为可能。
发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题 (topic) ,主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。
RabbitMQ的整体模型架构如下:
Producer:生产者,用来生产消息。并把消息发给交换机(生产者不会把消息直接发给某个队列,很多图你可能会看到生产者直连队列,其实中间隐藏了一个默认的交换机)。生产者也就是发送消息的一方。
Consumer:消费者,用来消费队列里的消息。也就是接受消息的一方。
Exchange:交换机,有些文章会成为交换器。其实这个东西的作用更像是路由器。交换机会根据生产者发过来的消息的routingKey,把消息丢到不同的队列中。
Queue:队列,用来存储交换机丢过来的消息(可以理解为邮箱)。一个队列可以被多个消费者进行消费,此时队列里的消息会按照轮询的方式一个个的分配给下面的消费者(不支持队列层面的广播消费)。
channel: 通道,RabbitMQ 处理的每条 AMQP 指令都是通过通道完成的。如下图所示。通道的存在其实就是为了复用TCP连接,本质上我们也可以使用TCP连接发送命令。但是当应用中有多个线程需要生产或者消费时,就需要创建多个TCP连接,而TCP连接的创建和销毁很费资源。
routingKey:路由键,交换机根据这个的值来决定把消息丢到哪个队列里,没有队列可以接受的话,可能把消息返回给生产者也可能直接丢弃。
Broker:RabbitMQ的服务节点或服务实例。可以简单里的理解为就是一台RabbitMQ服务器。
Binding:绑定,消费者端就行配置,建立队列与某个交换机的关系,这样交换机收到消息之后就知道是否要投递到这个队列了。
可以看到官网的教程里有六种模型:
看起来很多很唬人,但是不要怕,本质上也就以下两种,学起来也很快。
队列的一个消息只能被一个消费者消费,多个消费者可以通过轮询的方式消费也可以通过手动响应ack的方式竞争消费。
当开启一个消费者实例时模型如下:
当开启两个消费者实例时模型如下:
消费者示例:
channel.BasicConsume
的第二个参数为true,表示当消费者收到消息后(非消息的业务逻辑处理完后),会自动发送一个ack给mq表示消息已收到。
static void Main(string[] args)
var factory = new ConnectionFactory() HostName = "localhost" ;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.QueueDeclare("hello", false, false, false, null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, args) =>
byte[] body = args.Body.ToArray();
var msg = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received 0", msg);
;
//第二个参数autoAck为true
channel.BasicConsume("hello", true, consumer);
Console.WriteLine("Press [Enter] to exit");
Console.ReadKey();
生产者示例:
static void Main(string[] args)
var factory = new ConnectionFactory() HostName = "localhost" ;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
//声明队列操作是幂等的,当队列不存在时,会进行创建。
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
Console.WriteLine("请输入要发送的消息内容:");
string msg = null;
while (!string.IsNullOrEmpty(msg = Console.ReadLine()))
var body = Encoding.UTF8.GetBytes(msg);
//body是byte类型,使用了一个名为“”的默认交换机
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
Console.WriteLine("[x] Sent 0", msg);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
当开启多个消费者之后,默认会轮询消费。
当消费者消费完成一个消息之后,手动发送一条ack命令给broker。解决consumer突然死掉之后,导致消息丢失的问题。如果mq一直没收到ack,则会将此消息重新入队列,给其他消费者进行消费。
生产者示例:
task_queue
的durable设置为true,这样及时Broker重启,此队列也不会消失。Persistent
也设置为true。即消息也持久存储,但是并不代表消息会100%不丢失,它只是告诉MQ将消息存储在硬盘上。在MQ收到消息且写入硬盘之前如果挂了,那消息就丢了。如果需要保证100%的可用,可以使用后面小节的“发布确认”功能。 static void Main(string[] args)
var factory = new ConnectionFactory() HostName = "localhost" ;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
//durable设置为了true,队列持久化
channel.QueueDeclare(queue: "task_quene", durable: true, exclusive: false, autoDelete: false, arguments: null);
var props = channel.CreateBasicProperties();
//消息持久化
props.Persistent = true;
Console.WriteLine("请输入要发送的消息内容:");
string msg = null;
while (!string.IsNullOrEmpty(msg = Console.ReadLine()))
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "", routingKey: "task_quene", basicProperties: props, body: body);
Console.WriteLine("[x] Sent 0", msg);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
消费者示例:
默认情况下MQ会按照worker的顺序把队列里的消息一个个的分给worker,这种分配消息的方式有一定的弊端,假如有两个worker且队列里的消息根据耗时长短间隔排列。这样所有耗时长的消息都会被分给worker1,短的分配给worker2. 造成worker2长时间空闲。所以就可以通过设置Qos的方式来改善,channel.BasicQos(0, 1, false)
表示broker一次只把1个消息发给worker,直到这个worker发出了ack,才继续把下一个消息分给他。
static void Main(string[] args)
var factory = new ConnectionFactory() HostName = "localhost" ;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.QueueDeclare("task_quene", true, false, false, null);
//设置qos
channel.BasicQos(0, 1, false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received +=async (model, args) =>
byte[] body = args.Body.ToArray();
var msg = Encoding.UTF8.GetString(body);
Console.WriteLine($"[-] Task msg received");
await Task.Delay(msg.Length * 1000);//模拟耗时任务
Console.WriteLine(" [x] Task 0 Done", msg);
//手动发送ack,必须在同一个channel里发送
channel.BasicAck(args.DeliveryTag, false);
;
channel.BasicConsume("task_quene", false, consumer);
Console.WriteLine("Press [Enter] to exit");
Console.ReadKey();
消费者消费消息有两种模式:
在上面的例子中我们看到的其实就是推模式,使用的是channel.BasicConsume
方法。而拉模式需要使用channel.BasicGet
方法。如:
var response=channel.BasicGet("task_quene",autoAck:false);
var body=response.Body;
channel.BasicAck(response.Envelope.DeliveryTag,false);
注意:
BasicGet一次只能获取一条消息,且不能将其放到一个循环里来替代BasicComsume,否则会严重影响RabbitMQ的性能。如果要实现高吞吐量,则应该使用BasicConsume。
一个消息可以被多个消费者消费,此时就用到了交换机。
回顾下我们之前的例子:
RabbitMQ消息模型的设计核心思想是:生产者从来不把消息直接丢给队列,它甚至都不知道要把消息丢给哪个队列。
取而代之的是生产者只需要把消息丢给交换机(exchange)。交换机决定把消息丢给哪个队列,或者丢给哪些队列,或者丢弃这个消息。
下图就是发布订阅的模型:
交换机分为以下几种类型:
fanout
交换机1. 声明一个临时队列和一个交换机
消费者需要声明一个临时队列,这个临时队列只能是消费者声明。当消费者断开连接时,这个队列将会被删除。(此场景适用于我们的logs接收测试,因为消费者不关系之前的日志是什么)。临时队列的名称类似于amq.gen-JzTY20BRgKO-HjmUJj0wLg
格式。
消费者或者生产者也要声明一个交换机。
//创建临时队列(只能是消费者)
var quenuName = channel.QueueDeclare().QueueName;
//创建交换机(生产者或消费者)
channel.ExchangeDeclare("logs", ExchangeType.Fanout);
2. 将交换机与队列绑定
消费者需要将临时队列与交换机进行绑定。
channel.QueueBind(queue:quenuName, exchange:"logs",routingKey:"");
3. 最终模型与代码
与之前例子最大的不同是,此时生产者需要把消息发送到交换机而不是某个队列上。在发送时我们就需要提供一个routingKey
,但是fanout
模式的交换机会忽略这个参数。
这样当我们发送消息时,与exchange关联的所有队列都可以收到这个消息。
生产者:
static void Main(string[] args)
var factory = new ConnectionFactory() HostName = "localhost" ;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.ExchangeDeclare("logs", ExchangeType.Fanout);
Console.WriteLine("请输入要发送的消息内容:");
string msg = null;
while (!string.IsNullOrEmpty(msg = Console.ReadLine()))
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
Console.WriteLine("[x] Sent 0", msg);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
消费者:
这里我们使用channel.QueueDeclare().QueueName
创建一个临时队列并返回队列名称。当消费者断开连接时,这个队列将会被删除。(此场景试用与我们的logs接收,因为消费者不关心之前的日志是什么)
static void Main(string[] args)
var factory = new ConnectionFactory() HostName = "localhost" ;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.ExchangeDeclare("logs", ExchangeType.Fanout);
//创建一个临时队列
var queueName = channel.QueueDeclare().QueueName;
//交换机与队列的绑定
channel.QueueBind(queue:queueName, exchange:"logs",routingKey: "");
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, args) =>
byte[] body = args.Body.ToArray();
var msg = Encoding.UTF8.GetString(body);
Console.WriteLine($"[-] Task msg received");
//手动发送ack,必须在同一个channel里发送
channel.BasicAck(args.DeliveryTag, false);
;
channel.BasicConsume(queueName, false, consumer);
Console.WriteLine("Press [Enter] to exit");
Console.ReadKey();
在上面的打印log例子中,所有的消费者都能收到同一个log消息。在这一节我们将通过路由的方式来订阅消息的子集。例如一个消费者只用来接收critical
级别的消息,而其他消费者接收所有消息。
在上一小节中,消费者将队列与交换机绑定时用到了channel.QueueBind
方法,表示“这个队列对这个交换机发出消息很感兴趣,愿意接收这些消息”。这个绑定方法还接收一个routingKey
参数,取决于不同的交换机类型,这个参数有可能会被忽略(例如我们之前用到的fanout
交换机)。
Direct
交换机所以这里我们将以direct
类型的交换机为例,如果绑定队列时设置的routingKey
等于发送消息时设置的routingKey
,这个队列就可以收到消息。举例如下:
direct
类型的交换机X
下绑定了两个队列Q1和Q2。Q1的routingKey是orange
,Q2的routingKey是black
和green
。所以发送时如果消息的routingKey设置为orange
则Q1会接收到,如果是black
或green
则Q2会接收到,如果是其他的值,则会被交换